package com.atguigu.app.dwd;

import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

public class DwdTradeOrderDetailApp {


    //1  定义来源表 kafka  topic_db
    //2  定义lookup表
    //3  根据来源表进行sql过滤 产生4个单表(视图）
    //4  4个单表+1lookup表 进行join
    //5  定义目标表
    //6   写入 目标表
    public static void main(String[] args) {

        //0  环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.getConfig().setIdleStateRetention(Duration.ofMillis(10000));

        //1  定义来源表 kafka  topic_db
        String createTopicDBSQL = "  CREATE TABLE topic_db (\n" +
                "         `database`    STRING,\n" +
                "          `table` STRING,\n" +
                "         `type` STRING,\n" +
                "           `ts` STRING,\n" +
                "           `data` MAP<STRING,STRING>,\n" +
                "           `old` MAP<STRING,STRING>,\n" +
                "          `proc_time` as proctime()\n" +
                "         ) WITH (\n" +
                "            'connector' = 'kafka',\n" +
                "          'topic' = 'topic_db',\n" +
                "           'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "           'properties.group.id' = 'dwd_trade_order_detailapp',\n" +
                "           'scan.startup.mode' = 'group-offsets',\n" +
                "           'format' = 'json'\n" +
                "         )";
        tableEnv.executeSql(createTopicDBSQL);

        //2  定义lookup表
        String createLookupTableSQL = "  CREATE TABLE base_dic (\n" +
                "          dic_code STRING,\n" +
                "          dic_name STRING,\n" +
                "          PRIMARY KEY (dic_code) NOT ENFORCED\n" +
                "        ) WITH (\n" +
                "           'connector' = 'jdbc',\n" +
                "           'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "           'url' = 'jdbc:mysql://hadoop102:3306/gmall-220718-flink',\n" +
                "           'table-name' = 'base_dic',\n" +
                "          'username' = 'root',\n" +
                "          'password' = '000000',\n" +
                "          'lookup.cache.max-rows' = '1000',\n" +
                "          'lookup.cache.ttl' = '60s'\n" + //热点数据
                "         ) ";
        tableEnv.executeSql(createLookupTableSQL);


        //3  根据来源表进行sql过滤 产生4个单表(视图）
        //3.1  order_detail
        // select  `data`['id'],
        //         .....
        // from topic_db where `table`='order_detail' and type='insert'

        String selectOrderDetailSQL = "    select  `data`['id'],\n" +
                "data['id'] id,\n" +
                "data['order_id'] order_id,\n" +
                "data['sku_id'] sku_id,\n" +
                "data['sku_name'] sku_name,\n" +
                "data['create_time'] create_time,\n" +
                "data['source_id'] source_id,\n" +
                "data['source_type'] source_type,\n" +
                "data['sku_num'] sku_num,\n" +
                "cast(cast(data['sku_num'] as decimal(16,2)) * " +
                "cast(data['order_price'] as decimal(16,2)) as String) split_original_amount,\n" +
                "data['split_total_amount'] split_total_amount,\n" +
                "data['split_activity_amount'] split_activity_amount,\n" +
                "data['split_coupon_amount'] split_coupon_amount,\n" +
                "ts,\n" +
                "proc_time\n" +
                "    from topic_db where `table`='order_detail' and type='insert' ";
        Table orderDetailTable = tableEnv.sqlQuery(selectOrderDetailSQL);
        tableEnv.createTemporaryView("order_detail", orderDetailTable);
        //3.2  order_info
        //select  *
        // from  topic_db where `table`='order_info' and type='insert'
        String orderInfoSQL = "select    \n" +
                "data['id'] id,\n" +
                "data['user_id'] user_id,\n" +
                "data['province_id'] province_id\n" +
                "         from  topic_db where `table`='order_info' and type='insert'";
        Table orderInfoTable = tableEnv.sqlQuery(orderInfoSQL);
        tableEnv.createTemporaryView("order_info", orderInfoTable);

        //  3.3  order_detail_activity
        // select * from topic_db where `table`='order_detail_activity' and type='insert'
        Table orderDetailActivity = tableEnv.sqlQuery("select \n" +
                "data['order_detail_id'] order_detail_id,\n" +
                "data['activity_id'] activity_id,\n" +
                "data['activity_rule_id'] activity_rule_id\n" +
                "from `topic_db`\n" +
                "where `table` = 'order_detail_activity'\n" +
                "and `type` = 'insert'\n");
        tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);


        //3.4  order_detail_coupon
        // select * from topic_db where `table`='order_detail_coupon' and type='insert'
        Table orderDetailCoupon = tableEnv.sqlQuery("select\n" +
                "data['order_detail_id'] order_detail_id,\n" +
                "data['coupon_id'] coupon_id\n" +
                "from `topic_db`\n" +
                "where `table` = 'order_detail_coupon'\n" +
                "and `type` = 'insert'\n");
        tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);


        //4  4个单表+1lookup表 进行join
        // select * from
        // order_detail od join order_info oi on od.order_id=oi.id
        // left join order_detail_activity oa on od.id=oa.order_detail_id
        // left join order_detail_coupon oc on od.id=oc.order_detail_id
        //  join  base_dic FOR SYSTEM_TIME AS OF od.proc_time as bd  on  bd.dic_code=od.source_type
        String joinSQL = "        select " +
                "od.id,\n" +
                "od.order_id,\n" +
                "oi.user_id,\n" +
                "od.sku_id,\n" +
                "od.sku_name,\n" +
                "oi.province_id,\n" +
                "oa.activity_id,\n" +
                "oa.activity_rule_id,\n" +
                "oc.coupon_id,\n" +
                "date_format(od.create_time, 'yyyy-MM-dd') date_id,\n" +
                "od.create_time,\n" +
                "od.source_id,\n" +
                "od.source_type,\n" +
                "bd.dic_name source_type_name,\n" +
                "od.sku_num,\n" +
                "od.split_original_amount,\n" +
                "od.split_activity_amount,\n" +
                "od.split_coupon_amount,\n" +
                "od.split_total_amount,\n" +
                "od.ts \n" +
                " from\n" +
                "         order_detail od join order_info oi on od.order_id=oi.id\n" +
                "         left join order_detail_activity oa on od.id=oa.order_detail_id\n" +
                "          left join order_detail_coupon oc on od.id=oc.order_detail_id\n" +
                "          join  base_dic FOR SYSTEM_TIME AS OF od.proc_time as bd  on  bd.dic_code=od.source_type\n";
        //  tableEnv.executeSql(joinSQL).print();

        //5  定义目标表
        String createTradeOrderDetailSQL = "create table dwd_trade_order_detail(" +
                "id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "sku_id string,\n" +
                "sku_name string,\n" +
                "province_id string,\n" +
                "activity_id string,\n" +
                "activity_rule_id string,\n" +
                "coupon_id string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "source_id string,\n" +
                "source_type string,\n" +
                "source_type_name string,\n" +
                "sku_num string,\n" +
                "split_original_amount string,\n" +
                "split_activity_amount string,\n" +
                "split_coupon_amount string,\n" +
                "split_total_amount string,\n" +
                "ts string,\n" +
                "primary key(id) not enforced\n )" +
                KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_detail");

        tableEnv.executeSql(createTradeOrderDetailSQL);

        //6   写入 目标表
        String insertSQL = "insert into dwd_trade_order_detail " + joinSQL;
        tableEnv.executeSql(insertSQL);
    }
}
